fdddf59f387d20e2893156005688bb4d2c4cea77,gradoop-flink/src/main/java/org/gradoop/flink/model/impl/operators/matching/simulation/dual/DualSimulation.java,DualSimulation,simulateBulk,#DataSet#,178

Before Change


   */
  private DataSet<FatVertex> simulateBulk(DataSet<FatVertex> vertices) {

    if (LOG.isDebugEnabled()) {
      vertices = vertices
        .map(new PrintFatVertex(false, "iteration start"))
        .withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
        .withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
    }

    // ITERATION HEAD
    IterativeDataSet<FatVertex> workSet = vertices.iterate(Integer.MAX_VALUE);

    // ITERATION BODY

    // validate neighborhood of each vertex and create deletions
    DataSet<Deletion> deletions = workSet
      .filter(new UpdatedFatVertices())
      .flatMap(new ValidateNeighborhood(getQuery()));

    if (LOG.isDebugEnabled()) {
      deletions = deletions
        .map(new PrintDeletion(true, "deletion"))
        .withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
        .withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
    }

    // combine deletions to message
    DataSet<Message> combinedMessages = deletions
      .groupBy(0)
      .combineGroup(new CombinedMessages());

    if (LOG.isDebugEnabled()) {
      combinedMessages = combinedMessages
        .map(new PrintMessage(true, "combined"))
        .withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
        .withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
    }

    // group messages to final message
    DataSet<Message> messages = combinedMessages
      .groupBy(0)
      .reduceGroup(new GroupedMessages());

    if (LOG.isDebugEnabled()) {
      messages = messages
        .map(new PrintMessage(true, "grouped"))
        .withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)
        .withBroadcastSet(getEdgeMapping(), Printer.EDGE_MAPPING);
    }

    // update candidates and build next working set
    DataSet<FatVertex> nextWorkingSet = workSet
      .leftOuterJoin(messages)
      .where(0).equalTo(0) // vertexId == recipientId
      .with(new UpdateVertexState(getQuery()))
      .filter(new ValidFatVertices());

    if (LOG.isDebugEnabled()) {
      nextWorkingSet = nextWorkingSet
        .map(new PrintFatVertex(true, "next workset"))
        .withBroadcastSet(getVertexMapping(), Printer.VERTEX_MAPPING)

After Change


   */
  private DataSet<FatVertex> simulateBulk(DataSet<FatVertex> vertices) {

    vertices = log(vertices, new PrintFatVertex(false, "iteration start"),
      getVertexMapping(), getEdgeMapping());

    // ITERATION HEAD
    IterativeDataSet<FatVertex> workSet = vertices.iterate(Integer.MAX_VALUE);

    // ITERATION BODY

    // validate neighborhood of each vertex and create deletions
    DataSet<Deletion> deletions = workSet
      .filter(new UpdatedFatVertices())
      .flatMap(new ValidateNeighborhood(getQuery()));

    deletions = log(deletions, new PrintDeletion(true, "deletion"),
      getVertexMapping(), getEdgeMapping());

    // combine deletions to message
    DataSet<Message> combinedMessages = deletions
      .groupBy(0)
      .combineGroup(new CombinedMessages());

    combinedMessages = log(combinedMessages, new PrintMessage(true, "combined"),
      getVertexMapping(), getEdgeMapping());

    // group messages to final message
    DataSet<Message> messages = combinedMessages
      .groupBy(0)
      .reduceGroup(new GroupedMessages());

    messages = log(messages, new PrintMessage(true, "grouped"),
      getVertexMapping(), getEdgeMapping());

    // update candidates and build next working set
    DataSet<FatVertex> nextWorkingSet = workSet
      .leftOuterJoin(messages)
      .where(0).equalTo(0) // vertexId == recipientId
      .with(new UpdateVertexState(getQuery()))